/*
 * Copyright (c) 2022. China Mobile (SuZhou) Software Technology Co.,Ltd. All rights reserved.
 * Lakehouse is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

package com.chinamobile.cmss.lakehouse.engine.meta.crawler.model

import com.chinamobile.cmss.lakehouse.engine.meta.crawler.util.Utils


case class SchemaKey(
                      columns: List[ColumnInfo],
                      storageFormat: StorageFormat,
                      properties: Map[String, String]
                    )

case class Schema(
                   name: String,
                   path: String,
                   partitions: List[ColumnInfo],
                   partitionValues: List[PartitionValue],
                   columns: List[ColumnInfo],
                   storageFormat: StorageFormat,
                   properties: Map[String, String]
                 ) {
  def toSchemaKey: SchemaKey =
    SchemaKey(columns, storageFormat, properties)

  def toAddPartitionSql(): Option[String] = {
    if (partitionValues.isEmpty) {
      None
    } else
      Some {
        partitionValues
          .map { value =>
            s"alter table $name add partition(${
              partitions
                .zip(value.values)
                .map(c => Utils.surround("`", c._1.name) + "=" + Utils.surround("\"", c._2))
                .mkString(",")
            }) location '${value.path}'"
          }
          .mkString(";")
      }
  }

  def toHiveCreateSql(): String = {
    var sqlList = List(
      "CREATE EXTERNAL TABLE IF NOT EXISTS",
      name,
      columns
        .map(c => Utils.surround("`", c.name) + " " + c.columnType.hiveType)
        .mkString("(", ",", ")")
    )
    if (partitions.nonEmpty) {
      sqlList = sqlList ::: List(
        "PARTITIONED BY",
        partitions
          .map(p => Utils.surround("`", p.name) + " STRING")
          .mkString("(", ",", ")")
      )
    }
    if (storageFormat != StorageFormat.DEFAULT_PARQUET_FORMAT) {
      storageFormat.serde.foreach(serde =>
        sqlList =
          sqlList ::: List("ROW FORMAT SERDE", Utils.surround("'", serde))
      )
      if (storageFormat.properties.nonEmpty) {
        sqlList = sqlList ::: List(
          "WITH SERDEPROPERTIES",
          storageFormat.properties
            .map(entry =>
              Utils.surround("'", entry._1) + "=" + Utils
                .surround("'", entry._2)
            )
            .mkString("(", ",", ")")
        )
      }
    }
    if (storageFormat != StorageFormat.DEFAULT_PARQUET_FORMAT) {
      sqlList = sqlList ::: List(
        "STORED AS INPUTFORMAT",
        Utils.surround("'", storageFormat.inputFormat.get),
        "OUTPUTFORMAT",
        Utils.surround("'", storageFormat.outputFormat.get)
      )
    } else {
      sqlList = sqlList ::: List("STORED AS PARQUET")
    }
    sqlList = sqlList ::: List("LOCATION", Utils.surround("'", path))
    if (properties.nonEmpty) {
      sqlList = sqlList ::: List(
        "TBLPROPERTIES",
        properties
          .map(entry =>
            Utils.surround("'", entry._1) + "=" + Utils.surround("'", entry._2)
          )
          .mkString("(", ",", ")")
      )
    }
    sqlList.mkString(" ")
  }
}

object Schema {
  def simple(
              name: String,
              path: String,
              columns: List[ColumnInfo],
              storageFormat: StorageFormat,
              properties: Map[String, String]
            ): Schema = {
    Schema(
      name,
      path,
      List.empty,
      List.empty,
      columns,
      storageFormat,
      properties
    )
  }
}

case class StorageFormat(
                          inputFormat: Option[String],
                          outputFormat: Option[String],
                          serde: Option[String],
                          properties: Map[String, String]
                        )

object StorageFormat {
  val DEFAULT_PARQUET_FORMAT = StorageFormat(
    Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
    Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
    Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"),
    Map.empty
  )
  val DEFAULT_JSON_FORMAT = StorageFormat(
    Some("org.apache.hadoop.mapred.TextInputFormat"),
    Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"),
    Some("org.apache.hive.hcatalog.data.JsonSerDe"),
    Map.empty
  )
  //csv org.apache.hadoop.mapred.TextInputFormat org.apache.hadoop.hive.serde2.OpenCSVSerde org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat

}
